SQL API Extensions: Expose planning APIs and make classes public#38951
SQL API Extensions: Expose planning APIs and make classes public#38951damccorm wants to merge 11 commits into
Conversation
This comment was marked as outdated.
This comment was marked as outdated.
…lose, preserve collation, and rename parameter
…l(RelNode, QueryParameters) and expose it in BeamSqlEnv
421b376 to
fb42b31
Compare
…rtToBeamRel(RelNode, QueryParameters)
399f90a to
097e41f
Compare
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request enhances the Beam SQL Calcite integration by exposing helper methods in BeamSqlEnv and QueryPlanner to parse SQL queries into logical RelNode trees and convert them to physical BeamRelNode trees, allowing logical plan rewriting. It also adds support for custom SQL conformance resolution (such as BABEL for Spark-SQL syntax) and makes RowToCsv public. The review feedback recommends removing the contradictory @VisibleForTesting annotation from the newly public RowToCsv class, adding defensive null checks to several new public methods, clarifying error messages for query parameters, and importing SqlOperatorTable to avoid using its fully qualified name in method signatures.
Important
The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.
| @VisibleForTesting | ||
| static class RowToCsv extends PTransform<PCollection<Row>, PCollection<String>> | ||
| public static class RowToCsv extends PTransform<PCollection<Row>, PCollection<String>> |
There was a problem hiding this comment.
Since RowToCsv is now being made public specifically to allow external integration with text table serialization, keeping the @VisibleForTesting annotation is contradictory and misleading. It should be removed so that users and static analysis tools do not treat it as an internal-only API.
public static class RowToCsv extends PTransform<PCollection<Row>, PCollection<String>>| public org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlOperatorTable | ||
| getOperatorTable() { | ||
| return planner.getOperatorTable(); | ||
| } |
There was a problem hiding this comment.
Using the fully qualified name org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlOperatorTable in the method signature reduces readability. Since other Calcite classes are imported, we should use the simple name SqlOperatorTable instead.
public SqlOperatorTable getOperatorTable() {
return planner.getOperatorTable();
}…mRel catch blocks, optimize bindParameters, add null checks in parseToRel and convertToBeamRel, add null checks in registerSchemaFunction and use simple name for SqlOperatorTable, remove @VisibleForTesting from RowToCsv
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request introduces helper methods in BeamSqlEnv, QueryPlanner, and CalciteQueryPlanner to allow parsing SQL queries into logical plans (RelNode) and converting those logical plans into Beam physical plans (BeamRelNode). It also resolves an issue where the Calcite conformance property was dropped by the Avatica JDBC path by reading it directly from pipeline options. Feedback on these changes highlights two main improvements: clearing the ThreadLocal RelMetadataQuery.THREAD_PROVIDERS in a finally block to prevent memory leaks, and optimizing the bindParameters method to avoid redundant node copies when child inputs do not change.
Important
The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.
| } finally { | ||
| planner.close(); | ||
| } |
There was a problem hiding this comment.
The RelMetadataQuery.THREAD_PROVIDERS is a ThreadLocal variable that is set at the beginning of convertToBeamRel but never cleared. In long-running environments or thread pools, failing to clear ThreadLocal variables can lead to classloader memory leaks because the metadata provider holds references to the RelOptCluster and other heavy objects. We should remove the thread-local value in the finally block.
| } finally { | |
| planner.close(); | |
| } | |
| } finally { | |
| RelMetadataQuery.THREAD_PROVIDERS.remove(); | |
| planner.close(); | |
| } |
| private static RelNode bindParameters(RelNode rel, RexShuttle binder) { | ||
| RelNode newRel = rel.accept(binder); | ||
| java.util.List<RelNode> inputs = newRel.getInputs(); | ||
| java.util.List<RelNode> newInputs = new java.util.ArrayList<>(inputs.size()); | ||
| boolean changed = newRel != rel; | ||
| for (RelNode input : inputs) { | ||
| RelNode newInput = bindParameters(input, binder); | ||
| newInputs.add(newInput); | ||
| if (newInput != input) { | ||
| changed = true; | ||
| } | ||
| } | ||
| return changed ? newRel.copy(newRel.getTraitSet(), newInputs) : newRel; | ||
| } |
There was a problem hiding this comment.
In bindParameters, calling newRel.copy(...) when only the row expressions of newRel changed (but none of its child inputs changed) is redundant because newRel already contains the updated expressions and the original inputs. We should only perform a copy if the child inputs actually changed (inputsChanged is true). This avoids unnecessary node copies during parameter binding.
| private static RelNode bindParameters(RelNode rel, RexShuttle binder) { | |
| RelNode newRel = rel.accept(binder); | |
| java.util.List<RelNode> inputs = newRel.getInputs(); | |
| java.util.List<RelNode> newInputs = new java.util.ArrayList<>(inputs.size()); | |
| boolean changed = newRel != rel; | |
| for (RelNode input : inputs) { | |
| RelNode newInput = bindParameters(input, binder); | |
| newInputs.add(newInput); | |
| if (newInput != input) { | |
| changed = true; | |
| } | |
| } | |
| return changed ? newRel.copy(newRel.getTraitSet(), newInputs) : newRel; | |
| } | |
| private static RelNode bindParameters(RelNode rel, RexShuttle binder) { | |
| RelNode newRel = rel.accept(binder); | |
| java.util.List<RelNode> inputs = newRel.getInputs(); | |
| java.util.List<RelNode> newInputs = new java.util.ArrayList<>(inputs.size()); | |
| boolean inputsChanged = false; | |
| for (RelNode input : inputs) { | |
| RelNode newInput = bindParameters(input, binder); | |
| newInputs.add(newInput); | |
| if (newInput != input) { | |
| inputsChanged = true; | |
| } | |
| } | |
| return inputsChanged ? newRel.copy(newRel.getTraitSet(), newInputs) : newRel; | |
| } |
… during translation Remove early cleanup of THREAD_PROVIDERS in convertToBeamRel(RelNode, @nullable RelCollation) to prevent NullPointerException: metadataHandlerProvider when nodes query metadata during pipeline translation/assembly. TAG=agy CONV=0df243da-2867-4795-9889-6334ba7d1599
85ed76b to
d9b8938
Compare
… variable assignment Fix compileJava issues for Java 26 environments by adding a fallback to JDK 21 in the gradlew script. Remove assignment of non-existent variable previousThreadProviders in CalciteQueryPlanner.java to fix branch compilation. TAG=agy CONV=0df243da-2867-4795-9889-6334ba7d1599
Set maxParallelForks = 1 for all Test tasks when running on Java 17+. On Java 17+, running tests in parallel can corrupt the binary test results output store (causing EOFException/Buffer underflow crashes during report generation). Running tests sequentially on newer JDKs fixes this. TAG=agy CONV=0df243da-2867-4795-9889-6334ba7d1599
Undo recent changes to gradlew and BeamModulePlugin.groovy, allowing builds to be configured through standard environment variables (e.g. JAVA_HOME) or gradle properties instead of modifying the repository's files directly. TAG=agy CONV=0df243da-2867-4795-9889-6334ba7d1599
Description
This PR is split from #38866. It focuses on exposing Beam SQL's planning and optimization infrastructure as an extensible API.
Previously, Beam SQL's planning stages (via Calcite) were mostly internal and tightly coupled to executing a full SQL string end-to-end. This PR refactors and exposes these planning stages to allow external orchestration of Beam SQL.
Key Changes
parseLogicalPlan(String query)/parseToRel(...)toBeamSqlEnvandQueryPlannerto allow parsing a SQL query string into a Calcite logical plan (RelNode) without immediately optimizing or executing it.convertToBeamRel(RelNode logicalPlan)to allow taking an externally constructed or manipulated Calcite logical plan (RelNode) and converting it into a Beam physical plan (BeamRelNode/ PCollection pipeline).BeamCalciteTableconstructorpublicto allow external planners to instantiate it.TextTableProvider.RowToCsvclasspublicto allow external integration with text table serialization.testParseAndConvertHelpersinCalciteQueryPlannerTest.javathat specifically exercises these new APIs end-to-end.Why this is needed
This is a crucial feature for external query engines or orchestrators (such as Spark Connect or custom SQL platforms). They can now use Beam's SQL parser to get a logical plan, perform their own optimizations or integrations, and then hand it back to Beam to generate the final executable pipeline.